1   /**
2    * Copyright 2014 Netflix, Inc.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    * http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package rx;
17  
18  import static org.junit.Assert.*;
19  
20  import java.util.List;
21  import java.util.concurrent.*;
22  import java.util.concurrent.atomic.*;
23  
24  import org.junit.*;
25  
26  import org.junit.rules.TestName;
27  import rx.Observable.OnSubscribe;
28  import rx.exceptions.MissingBackpressureException;
29  import rx.functions.*;
30  import rx.internal.util.RxRingBuffer;
31  import rx.observers.TestSubscriber;
32  import rx.schedulers.Schedulers;
33  import rx.test.TestObstructionDetection;
34  
35  public class BackpressureTests {
36  
37      @Rule
38      public TestName testName = new TestName();
39  
40      @After
41      public void doAfterTest() {
42          TestObstructionDetection.checkObstruction();
43      }
44      
45      @Test
46      public void testObserveOn() {
47          int NUM = (int) (RxRingBuffer.SIZE * 2.1);
48          AtomicInteger c = new AtomicInteger();
49          TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
50          incrementingIntegers(c).observeOn(Schedulers.computation()).take(NUM).subscribe(ts);
51          ts.awaitTerminalEvent();
52          ts.assertNoErrors();
53          System.out.println("testObserveOn => Received: " + ts.getOnNextEvents().size() + "  Emitted: " + c.get());
54          assertEquals(NUM, ts.getOnNextEvents().size());
55          assertTrue(c.get() < RxRingBuffer.SIZE * 4);
56      }
57  
58      @Test
59      public void testObserveOnWithSlowConsumer() {
60          int NUM = (int) (RxRingBuffer.SIZE * 0.2);
61          AtomicInteger c = new AtomicInteger();
62          TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
63          incrementingIntegers(c).observeOn(Schedulers.computation()).map(new Func1<Integer, Integer>() {
64  
65              @Override
66              public Integer call(Integer i) {
67                  try {
68                      Thread.sleep(1);
69                  } catch (InterruptedException e) {
70                      e.printStackTrace();
71                  }
72                  return i;
73              }
74  
75          }).take(NUM).subscribe(ts);
76          ts.awaitTerminalEvent();
77          ts.assertNoErrors();
78          System.out.println("testObserveOnWithSlowConsumer => Received: " + ts.getOnNextEvents().size() + "  Emitted: " + c.get());
79          assertEquals(NUM, ts.getOnNextEvents().size());
80          assertTrue(c.get() < RxRingBuffer.SIZE * 2);
81      }
82  
83      @Test
84      public void testMergeSync() {
85          int NUM = (int) (RxRingBuffer.SIZE * 4.1);
86          AtomicInteger c1 = new AtomicInteger();
87          AtomicInteger c2 = new AtomicInteger();
88          TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
89          Observable<Integer> merged = Observable.merge(incrementingIntegers(c1), incrementingIntegers(c2));
90  
91          merged.take(NUM).subscribe(ts);
92          ts.awaitTerminalEvent();
93          ts.assertNoErrors();
94          System.out.println("Expected: " + NUM + " got: " + ts.getOnNextEvents().size());
95          System.out.println("testMergeSync => Received: " + ts.getOnNextEvents().size() + "  Emitted: " + c1.get() + " / " + c2.get());
96          assertEquals(NUM, ts.getOnNextEvents().size());
97          // either one can starve the other, but neither should be capable of doing more than 5 batches (taking 4.1)
98          // TODO is it possible to make this deterministic rather than one possibly starving the other?
99          // benjchristensen => In general I'd say it's not worth trying to make it so, as "fair" algoritms generally take a performance hit
100         assertTrue(c1.get() < RxRingBuffer.SIZE * 5);
101         assertTrue(c2.get() < RxRingBuffer.SIZE * 5);
102     }
103 
104     @Test
105     public void testMergeAsync() {
106         int NUM = (int) (RxRingBuffer.SIZE * 4.1);
107         AtomicInteger c1 = new AtomicInteger();
108         AtomicInteger c2 = new AtomicInteger();
109         TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
110         Observable<Integer> merged = Observable.merge(
111                 incrementingIntegers(c1).subscribeOn(Schedulers.computation()),
112                 incrementingIntegers(c2).subscribeOn(Schedulers.computation()));
113 
114         merged.take(NUM).subscribe(ts);
115         ts.awaitTerminalEvent();
116         ts.assertNoErrors();
117         System.out.println("testMergeAsync => Received: " + ts.getOnNextEvents().size() + "  Emitted: " + c1.get() + " / " + c2.get());
118         assertEquals(NUM, ts.getOnNextEvents().size());
119         // either one can starve the other, but neither should be capable of doing more than 5 batches (taking 4.1)
120         // TODO is it possible to make this deterministic rather than one possibly starving the other?
121         // benjchristensen => In general I'd say it's not worth trying to make it so, as "fair" algoritms generally take a performance hit
122         assertTrue(c1.get() < RxRingBuffer.SIZE * 5);
123         assertTrue(c2.get() < RxRingBuffer.SIZE * 5);
124     }
125 
126     @Test
127     public void testMergeAsyncThenObserveOn() {
128         int NUM = (int) (RxRingBuffer.SIZE * 4.1);
129         AtomicInteger c1 = new AtomicInteger();
130         AtomicInteger c2 = new AtomicInteger();
131         TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
132         Observable<Integer> merged = Observable.merge(
133                 incrementingIntegers(c1).subscribeOn(Schedulers.computation()),
134                 incrementingIntegers(c2).subscribeOn(Schedulers.computation()));
135 
136         merged.observeOn(Schedulers.newThread()).take(NUM).subscribe(ts);
137         ts.awaitTerminalEvent();
138         ts.assertNoErrors();
139         System.out.println("testMergeAsyncThenObserveOn => Received: " + ts.getOnNextEvents().size() + "  Emitted: " + c1.get() + " / " + c2.get());
140         assertEquals(NUM, ts.getOnNextEvents().size());
141         // either one can starve the other, but neither should be capable of doing more than 5 batches (taking 4.1)
142         // TODO is it possible to make this deterministic rather than one possibly starving the other?
143         // benjchristensen => In general I'd say it's not worth trying to make it so, as "fair" algoritms generally take a performance hit
144         // akarnokd => run this in a loop over 10k times and never saw values get as high as 7*SIZE, but since observeOn delays the unsubscription non-deterministically, the test will remain unreliable
145         assertTrue(c1.get() < RxRingBuffer.SIZE * 7);
146         assertTrue(c2.get() < RxRingBuffer.SIZE * 7);
147     }
148 
149     @Test
150     public void testFlatMapSync() {
151         int NUM = (int) (RxRingBuffer.SIZE * 2.1);
152         AtomicInteger c = new AtomicInteger();
153         TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
154         incrementingIntegers(c).flatMap(new Func1<Integer, Observable<Integer>>() {
155 
156             @Override
157             public Observable<Integer> call(Integer i) {
158                 return incrementingIntegers(new AtomicInteger()).take(10);
159             }
160 
161         }).take(NUM).subscribe(ts);
162         ts.awaitTerminalEvent();
163         ts.assertNoErrors();
164         System.out.println("testFlatMapSync => Received: " + ts.getOnNextEvents().size() + "  Emitted: " + c.get());
165         assertEquals(NUM, ts.getOnNextEvents().size());
166         // expect less than 1 buffer since the flatMap is emitting 10 each time, so it is NUM/10 that will be taken.
167         assertTrue(c.get() < RxRingBuffer.SIZE);
168     }
169 
170     @Test
171     @Ignore // the test is non-deterministic and can't be made deterministic
172     public void testFlatMapAsync() {
173         int NUM = (int) (RxRingBuffer.SIZE * 2.1);
174         AtomicInteger c = new AtomicInteger();
175         TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
176         incrementingIntegers(c).subscribeOn(Schedulers.computation()).flatMap(new Func1<Integer, Observable<Integer>>() {
177 
178             @Override
179             public Observable<Integer> call(Integer i) {
180                 return incrementingIntegers(new AtomicInteger()).take(10).subscribeOn(Schedulers.computation());
181             }
182 
183         }).take(NUM).subscribe(ts);
184         ts.awaitTerminalEvent();
185         ts.assertNoErrors();
186         System.out.println("testFlatMapAsync => Received: " + ts.getOnNextEvents().size() + "  Emitted: " + c.get() + " Size: " + RxRingBuffer.SIZE);
187         assertEquals(NUM, ts.getOnNextEvents().size());
188         // even though we only need 10, it will request at least RxRingBuffer.SIZE, and then as it drains keep requesting more
189         // and then it will be non-deterministic when the take() causes the unsubscribe as it is scheduled on 10 different schedulers (threads)
190         // normally this number is ~250 but can get up to ~1200 when RxRingBuffer.SIZE == 1024
191         assertTrue(c.get() <= RxRingBuffer.SIZE * 2);
192     }
193 
194     @Test
195     public void testZipSync() {
196         int NUM = (int) (RxRingBuffer.SIZE * 4.1);
197         AtomicInteger c1 = new AtomicInteger();
198         AtomicInteger c2 = new AtomicInteger();
199         TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
200         Observable<Integer> zipped = Observable.zip(
201                 incrementingIntegers(c1),
202                 incrementingIntegers(c2),
203                 new Func2<Integer, Integer, Integer>() {
204 
205                     @Override
206                     public Integer call(Integer t1, Integer t2) {
207                         return t1 + t2;
208                     }
209 
210                 });
211 
212         zipped.take(NUM).subscribe(ts);
213         ts.awaitTerminalEvent();
214         ts.assertNoErrors();
215         System.out.println("testZipSync => Received: " + ts.getOnNextEvents().size() + "  Emitted: " + c1.get() + " / " + c2.get());
216         assertEquals(NUM, ts.getOnNextEvents().size());
217         assertTrue(c1.get() < RxRingBuffer.SIZE * 5);
218         assertTrue(c2.get() < RxRingBuffer.SIZE * 5);
219     }
220 
221     @Test
222     public void testZipAsync() {
223         int NUM = (int) (RxRingBuffer.SIZE * 2.1);
224         AtomicInteger c1 = new AtomicInteger();
225         AtomicInteger c2 = new AtomicInteger();
226         TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
227         Observable<Integer> zipped = Observable.zip(
228                 incrementingIntegers(c1).subscribeOn(Schedulers.computation()),
229                 incrementingIntegers(c2).subscribeOn(Schedulers.computation()),
230                 new Func2<Integer, Integer, Integer>() {
231 
232                     @Override
233                     public Integer call(Integer t1, Integer t2) {
234                         return t1 + t2;
235                     }
236 
237                 });
238 
239         zipped.take(NUM).subscribe(ts);
240         ts.awaitTerminalEvent();
241         ts.assertNoErrors();
242         System.out.println("testZipAsync => Received: " + ts.getOnNextEvents().size() + "  Emitted: " + c1.get() + " / " + c2.get());
243         assertEquals(NUM, ts.getOnNextEvents().size());
244         assertTrue(c1.get() < RxRingBuffer.SIZE * 3);
245         assertTrue(c2.get() < RxRingBuffer.SIZE * 3);
246     }
247 
248     @Test
249     public void testSubscribeOnScheduling() {
250         // in a loop for repeating the concurrency in this to increase chance of failure
251         for (int i = 0; i < 100; i++) {
252             int NUM = (int) (RxRingBuffer.SIZE * 2.1);
253             AtomicInteger c = new AtomicInteger();
254             ConcurrentLinkedQueue<Thread> threads = new ConcurrentLinkedQueue<Thread>();
255             TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
256             // observeOn is there to make it async and need backpressure
257             incrementingIntegers(c, threads).subscribeOn(Schedulers.computation()).observeOn(Schedulers.computation()).take(NUM).subscribe(ts);
258             ts.awaitTerminalEvent();
259             ts.assertNoErrors();
260             System.out.println("testSubscribeOnScheduling => Received: " + ts.getOnNextEvents().size() + "  Emitted: " + c.get());
261             assertEquals(NUM, ts.getOnNextEvents().size());
262             assertTrue(c.get() < RxRingBuffer.SIZE * 4);
263             Thread first = null;
264             for (Thread t : threads) {
265                 System.out.println("testSubscribeOnScheduling => thread: " + t);
266                 if (first == null) {
267                     first = t;
268                 } else {
269                     if (!first.equals(t)) {
270                         fail("Expected to see the same thread");
271                     }
272                 }
273             }
274             System.out.println("testSubscribeOnScheduling => Number of batch requests seen: " + threads.size());
275             assertTrue(threads.size() > 1);
276             System.out.println("-------------------------------------------------------------------------------------------");
277         }
278     }
279 
280     @Test
281     public void testTakeFilterSkipChainAsync() {
282         int NUM = (int) (RxRingBuffer.SIZE * 2.1);
283         AtomicInteger c = new AtomicInteger();
284         TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
285         incrementingIntegers(c).observeOn(Schedulers.computation())
286                 .skip(10000)
287                 .filter(new Func1<Integer, Boolean>() {
288 
289                     @Override
290                     public Boolean call(Integer i) {
291                         return i > 11000;
292                     }
293 
294                 }).take(NUM).subscribe(ts);
295 
296         ts.awaitTerminalEvent();
297         ts.assertNoErrors();
298 
299         // emit 10000 that are skipped
300         // emit next 1000 that are filtered out
301         // take NUM
302         // so emitted is at least 10000+1000+NUM + extra for buffer size/threshold
303         int expected = 10000 + 1000 + RxRingBuffer.SIZE * 3 + RxRingBuffer.SIZE / 2;
304 
305         System.out.println("testTakeFilterSkipChain => Received: " + ts.getOnNextEvents().size() + "  Emitted: " + c.get() + " Expected: " + expected);
306         assertEquals(NUM, ts.getOnNextEvents().size());
307         assertTrue(c.get() < expected);
308     }
309 
310     @Test
311     public void testUserSubscriberUsingRequestSync() {
312         AtomicInteger c = new AtomicInteger();
313         final AtomicInteger totalReceived = new AtomicInteger();
314         final AtomicInteger batches = new AtomicInteger();
315         final AtomicInteger received = new AtomicInteger();
316         incrementingIntegers(c).subscribe(new Subscriber<Integer>() {
317 
318             @Override
319             public void onStart() {
320                 request(100);
321             }
322 
323             @Override
324             public void onCompleted() {
325 
326             }
327 
328             @Override
329             public void onError(Throwable e) {
330 
331             }
332 
333             @Override
334             public void onNext(Integer t) {
335                 int total = totalReceived.incrementAndGet();
336                 received.incrementAndGet();
337                 if (total >= 2000) {
338                     unsubscribe();
339                 }
340                 if (received.get() == 100) {
341                     batches.incrementAndGet();
342                     request(100);
343                     received.set(0);
344                 }
345             }
346 
347         });
348 
349         System.out.println("testUserSubscriberUsingRequestSync => Received: " + totalReceived.get() + "  Emitted: " + c.get() + " Request Batches: " + batches.get());
350         assertEquals(2000, c.get());
351         assertEquals(2000, totalReceived.get());
352         assertEquals(20, batches.get());
353     }
354 
355     @Test
356     public void testUserSubscriberUsingRequestAsync() throws InterruptedException {
357         AtomicInteger c = new AtomicInteger();
358         final AtomicInteger totalReceived = new AtomicInteger();
359         final AtomicInteger received = new AtomicInteger();
360         final AtomicInteger batches = new AtomicInteger();
361         final CountDownLatch latch = new CountDownLatch(1);
362         incrementingIntegers(c).subscribeOn(Schedulers.newThread()).subscribe(new Subscriber<Integer>() {
363 
364             @Override
365             public void onStart() {
366                 request(100);
367             }
368 
369             @Override
370             public void onCompleted() {
371                 latch.countDown();
372             }
373 
374             @Override
375             public void onError(Throwable e) {
376                 latch.countDown();
377             }
378 
379             @Override
380             public void onNext(Integer t) {
381                 int total = totalReceived.incrementAndGet();
382                 received.incrementAndGet();
383                 boolean done = false;
384                 if (total >= 2000) {
385                     done = true;
386                     unsubscribe();
387                 }
388                 if (received.get() == 100) {
389                     batches.incrementAndGet();
390                     received.set(0);
391                     if (!done) {
392                         request(100);
393                     }
394                 }
395                 if (done) {
396                     latch.countDown();
397                 }
398             }
399 
400         });
401 
402         latch.await();
403         System.out.println("testUserSubscriberUsingRequestAsync => Received: " + totalReceived.get() + "  Emitted: " + c.get() + " Request Batches: " + batches.get());
404         assertEquals(2000, c.get());
405         assertEquals(2000, totalReceived.get());
406         assertEquals(20, batches.get());
407     }
408 
409     @Test(timeout = 2000)
410     public void testFirehoseFailsAsExpected() {
411         AtomicInteger c = new AtomicInteger();
412         TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
413         firehose(c).observeOn(Schedulers.computation()).map(SLOW_PASS_THRU).subscribe(ts);
414         ts.awaitTerminalEvent();
415         System.out.println("testFirehoseFailsAsExpected => Received: " + ts.getOnNextEvents().size() + "  Emitted: " + c.get());
416         assertEquals(1, ts.getOnErrorEvents().size());
417         assertTrue(ts.getOnErrorEvents().get(0) instanceof MissingBackpressureException);
418     }
419 
420     @Test(timeout = 10000)
421     public void testOnBackpressureDrop() {
422         for (int i = 0; i < 100; i++) {
423             int NUM = (int) (RxRingBuffer.SIZE * 1.1); // > 1 so that take doesn't prevent buffer overflow
424             AtomicInteger c = new AtomicInteger();
425             TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
426             firehose(c).onBackpressureDrop()
427             .observeOn(Schedulers.computation())
428             .map(SLOW_PASS_THRU).take(NUM).subscribe(ts);
429             ts.awaitTerminalEvent();
430             ts.assertNoErrors();
431 
432             List<Integer> onNextEvents = ts.getOnNextEvents();
433             assertEquals(NUM, onNextEvents.size());
434 
435             Integer lastEvent = onNextEvents.get(NUM - 1);
436 
437             System.out.println("testOnBackpressureDrop => Received: " + onNextEvents.size() + "  Emitted: " + c.get() + " Last value: " + lastEvent);
438             // it drop, so we should get some number far higher than what would have sequentially incremented
439             assertTrue(NUM - 1 <= lastEvent.intValue());
440         }
441     }
442 
443     @Test(timeout = 10000)
444     public void testOnBackpressureDropWithAction() {
445         for (int i = 0; i < 100; i++) {
446             final AtomicInteger emitCount = new AtomicInteger();
447             final AtomicInteger dropCount = new AtomicInteger();
448             final AtomicInteger passCount = new AtomicInteger();
449             final int NUM = RxRingBuffer.SIZE * 3; // > 1 so that take doesn't prevent buffer overflow
450             TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
451             firehose(emitCount).onBackpressureDrop(new Action1<Integer>() {
452                 @Override
453                 public void call(Integer i) {
454                     dropCount.incrementAndGet();
455                 }
456             })
457             .doOnNext(new Action1<Integer>() {
458                 @Override
459                 public void call(Integer integer) {
460                     passCount.incrementAndGet();
461                 }
462             })
463             .observeOn(Schedulers.computation())
464             .map(SLOW_PASS_THRU).take(NUM).subscribe(ts);
465             ts.awaitTerminalEvent();
466             ts.assertNoErrors();
467             
468             List<Integer> onNextEvents = ts.getOnNextEvents();
469             Integer lastEvent = onNextEvents.get(NUM - 1);
470             System.out.println(testName.getMethodName() + " => Received: " + onNextEvents.size() + " Passed: " + passCount.get() + " Dropped: " + dropCount.get() + "  Emitted: " + emitCount.get() + " Last value: " + lastEvent);
471             assertEquals(NUM, onNextEvents.size());
472             // in reality, NUM < passCount
473             assertTrue(NUM <= passCount.get());
474             // it drop, so we should get some number far higher than what would have sequentially incremented
475             assertTrue(NUM - 1 <= lastEvent.intValue());
476             assertTrue(0 < dropCount.get());
477             assertEquals(emitCount.get(), passCount.get() + dropCount.get());
478         }
479     }
480 
481     @Test(timeout = 10000)
482     public void testOnBackpressureDropSynchronous() {
483         for (int i = 0; i < 100; i++) {
484             int NUM = (int) (RxRingBuffer.SIZE * 1.1); // > 1 so that take doesn't prevent buffer overflow
485             AtomicInteger c = new AtomicInteger();
486             TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
487             firehose(c).onBackpressureDrop()
488             .map(SLOW_PASS_THRU).take(NUM).subscribe(ts);
489             ts.awaitTerminalEvent();
490             ts.assertNoErrors();
491 
492             List<Integer> onNextEvents = ts.getOnNextEvents();
493             assertEquals(NUM, onNextEvents.size());
494 
495             Integer lastEvent = onNextEvents.get(NUM - 1);
496 
497             System.out.println("testOnBackpressureDrop => Received: " + onNextEvents.size() + "  Emitted: " + c.get() + " Last value: " + lastEvent);
498             // it drop, so we should get some number far higher than what would have sequentially incremented
499             assertTrue(NUM - 1 <= lastEvent.intValue());
500         }
501     }
502 
503     @Test(timeout = 10000)
504     public void testOnBackpressureDropSynchronousWithAction() {
505         for (int i = 0; i < 100; i++) {
506             final AtomicInteger dropCount = new AtomicInteger();
507             int NUM = (int) (RxRingBuffer.SIZE * 1.1); // > 1 so that take doesn't prevent buffer overflow
508             AtomicInteger c = new AtomicInteger();
509             TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
510             firehose(c).onBackpressureDrop(new Action1<Integer>() {
511                 @Override
512                 public void call(Integer i) {
513                     dropCount.incrementAndGet();
514                 }
515             })
516                     .map(SLOW_PASS_THRU).take(NUM).subscribe(ts);
517             ts.awaitTerminalEvent();
518             ts.assertNoErrors();
519 
520             List<Integer> onNextEvents = ts.getOnNextEvents();
521             assertEquals(NUM, onNextEvents.size());
522 
523             Integer lastEvent = onNextEvents.get(NUM - 1);
524 
525             System.out.println("testOnBackpressureDrop => Received: " + onNextEvents.size() + " Dropped: " + dropCount.get() + "  Emitted: " + c.get() + " Last value: " + lastEvent);
526             // it drop, so we should get some number far higher than what would have sequentially incremented
527             assertTrue(NUM - 1 <= lastEvent.intValue());
528             // no drop in synchronous mode
529             assertEquals(0, dropCount.get());
530             assertEquals(c.get(), onNextEvents.size());
531         }
532     }
533 
534     @Test(timeout = 2000)
535     public void testOnBackpressureBuffer() {
536         int NUM = (int) (RxRingBuffer.SIZE * 1.1); // > 1 so that take doesn't prevent buffer overflow
537         AtomicInteger c = new AtomicInteger();
538         TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
539         firehose(c).takeWhile(new Func1<Integer, Boolean>() {
540 
541             @Override
542             public Boolean call(Integer t1) {
543                 return t1 < 100000;
544             }
545 
546         }).onBackpressureBuffer().observeOn(Schedulers.computation()).map(SLOW_PASS_THRU).take(NUM).subscribe(ts);
547         ts.awaitTerminalEvent();
548         ts.assertNoErrors();
549         System.out.println("testOnBackpressureBuffer => Received: " + ts.getOnNextEvents().size() + "  Emitted: " + c.get());
550         assertEquals(NUM, ts.getOnNextEvents().size());
551         // it buffers, so we should get the right value sequentially
552         assertEquals(NUM - 1, ts.getOnNextEvents().get(NUM - 1).intValue());
553     }
554 
555     /**
556      * A synchronous Observable that will emit incrementing integers as requested.
557      * 
558      * @param counter
559      * @return
560      */
561     private static Observable<Integer> incrementingIntegers(final AtomicInteger counter) {
562         return incrementingIntegers(counter, null);
563     }
564 
565     private static Observable<Integer> incrementingIntegers(final AtomicInteger counter, final ConcurrentLinkedQueue<Thread> threadsSeen) {
566         return Observable.create(new OnSubscribe<Integer>() {
567 
568             final AtomicLong requested = new AtomicLong();
569 
570             @Override
571             public void call(final Subscriber<? super Integer> s) {
572                 s.setProducer(new Producer() {
573                     int i = 0;
574 
575                     @Override
576                     public void request(long n) {
577                         if (n == 0) {
578                             // nothing to do
579                             return;
580                         }
581                         if (threadsSeen != null) {
582                             threadsSeen.offer(Thread.currentThread());
583                         }
584                         long _c = requested.getAndAdd(n);
585                         if (_c == 0) {
586                             while (!s.isUnsubscribed()) {
587                                 counter.incrementAndGet();
588                                 s.onNext(i++);
589                                 if (requested.decrementAndGet() == 0) {
590                                     // we're done emitting the number requested so return
591                                     return;
592                                 }
593                             }
594                         }
595                     }
596 
597                 });
598             }
599 
600         });
601     }
602 
603     /**
604      * Incrementing int without backpressure.
605      * 
606      * @param counter
607      * @return
608      */
609     private static Observable<Integer> firehose(final AtomicInteger counter) {
610         return Observable.create(new OnSubscribe<Integer>() {
611 
612             int i = 0;
613 
614             @Override
615             public void call(final Subscriber<? super Integer> s) {
616                 while (!s.isUnsubscribed()) {
617                     s.onNext(i++);
618                     counter.incrementAndGet();
619                 }
620                 System.out.println("unsubscribed after: " + i);
621             }
622 
623         });
624     }
625 
626     final static Func1<Integer, Integer> SLOW_PASS_THRU = new Func1<Integer, Integer>() {
627         volatile int sink;
628         @Override
629         public Integer call(Integer t1) {
630             // be slow ... but faster than Thread.sleep(1)
631             String t = "";
632             int s = sink;
633             for (int i = 1000; i >= 0; i--) {
634                 t = String.valueOf(i + t.hashCode() + s);
635             }
636             sink = t.hashCode();
637             return t1;
638         }
639 
640     };
641 }